# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, os, hashlib, gzip, sys, warnings
import itertools as it
try:
import cPickle as pickle
except:
import pickle
from abc import ABCMeta, abstractmethod
from hysop import __KERNEL_DEBUG__
from hysop.tools.units import time2str
from hysop.tools.contexts import Timer
from hysop.tools.htypes import check_instance
from hysop.tools.io_utils import IO
from hysop.tools.misc import previous_pow2
from hysop.tools.numpywrappers import npw
from hysop.tools.cache import load_cache, update_cache
from hysop.backend.device.autotunable_kernel import (
AutotunableKernel,
AutotunerWorkConfiguration,
)
from hysop.backend.device.kernel_statistics import KernelStatistics
from hysop.backend.device.kernel_autotuner_statistics import AutotunedKernelStatistics
from hysop.backend.device.codegen import CodeGeneratorWarning
[docs]
class KernelGenerationError(RuntimeError):
pass
[docs]
class KernelAutotuner(metaclass=ABCMeta):
FULL_RESULTS_KEY = "__FULL_RESULTS__"
DUMP_LAST_TUNED_KERNEL = False
STORE_FULL_KERNEL_SOURCES = False
@staticmethod
def _hash_func():
return hashlib.new("sha256")
[docs]
def use_tmp_cache(self):
self._cache_dir = IO.get_tmp_dir("kernel_autotuner")
[docs]
def use_system_cache(self):
self._cache_dir = IO.cache_path() + "/kernel_autotuner"
[docs]
def cache_dir(self):
assert self._cache_dir is not None
return self._cache_dir
[docs]
def cache_file(self):
cache_file = "{}/{}.pklz".format(self.cache_dir(), self.name.replace(" ", "_"))
return cache_file
def _reload_cache(self, extra_kwds_hash):
cache_file = self.cache_file()
if self.verbose:
print(self.indent(1) + f">Loading cached results from '{cache_file}'.")
self.all_results = load_cache(cache_file)
config_key = self.autotuner_config_key()
config_key += (extra_kwds_hash,)
self.config_key = config_key
self.results = self.all_results.setdefault(config_key, {})
return self.results
def _dump_cache(self, silent=False):
cache_file = self.cache_file()
if (not silent) and (self.verbose > 1):
print(self.indent(1) + f">Caching results to '{cache_file}'.")
update_cache(cache_file, self.config_key, self.results)
def __init__(self, name, tunable_kernel, **kwds):
"""
Initialize a KernelAutotuner.
Parameters
----------
name: str
Name of this Autotuner for logging and caching purposes.
tunable_kernel: TunableKernel
The kernel to be tuned.
"""
super().__init__(**kwds)
check_instance(name, str)
check_instance(tunable_kernel, AutotunableKernel)
self.name = name
self.tunable_kernel = tunable_kernel
self.autotuner_config = tunable_kernel.autotuner_config
self.build_opts = tunable_kernel.build_opts
self.indent = lambda i: " " * i
self.verbose = self.autotuner_config.verbose
self.result_keys = (
"extra_parameters", # 00
"work_size", # 01
"work_load", # 02
"global_work_size", # 03
"local_work_size", # 04
"program", # 05
"kernel", # 06
"kernel_statistics", # 07
"kernel_src", # 08
"kernel_name", # 09
"src_hash", # 10
"extra_kwds_hash", # 10
"extra_kwds_hash_logs", # 12
)
for i, pname in enumerate(self.result_keys):
setattr(self, f"{pname}_idx", i)
self._cache_dir = None
[docs]
def autotune(
self, extra_kwds, first_working=False, force_verbose=False, force_debug=False
):
"""
Autotune the target tunable_kernels.
Parameters
----------
first_working:
Disable caching, build and execute first valid kernel at most one time.
extra_kwds: dict
Extra keywords used to tune the kernel.
"""
tkernel = self.tunable_kernel
autotuner_config = self.autotuner_config
extra_kwds_hash, extra_kwds_hash_logs = tkernel.hash_extra_kwds(extra_kwds)
hasher = self._hash_func()
hasher.update(str(extra_kwds_hash).encode("utf-8"))
extra_kwds_hash = hasher.hexdigest()
check_instance(extra_kwds_hash, str)
check_instance(extra_kwds_hash_logs, str)
file_basename = f"{self.name}_{extra_kwds_hash[:4]}"
self._print_header(extra_kwds)
if autotuner_config.override_cache:
if self.verbose:
print(
self.indent(1)
+ ">Using temporary cache folder, benching all new kernels."
)
self.use_tmp_cache()
else:
self.use_system_cache()
results = self._reload_cache(extra_kwds_hash)
if first_working:
best_candidate = None
else:
best_candidate = self._load_results_from_cache(
tkernel,
results,
extra_kwds,
force_verbose,
force_debug,
extra_kwds_hash,
extra_kwds_hash_logs,
file_basename,
)
if best_candidate is None:
best_candidate = self._autotune_kernels(
tkernel,
results,
extra_kwds,
force_verbose,
force_debug,
first_working,
extra_kwds_hash,
extra_kwds_hash_logs,
file_basename,
)
from_cache = False
else:
from_cache = True
assert len(self.result_keys) == len(best_candidate)
return dict(zip(self.result_keys, best_candidate)), file_basename, from_cache
def _load_results_from_cache(
self,
tkernel,
results,
extra_kwds,
force_verbose,
force_debug,
extra_kwds_hash,
extra_kwds_hash_logs,
file_basename,
):
if self.FULL_RESULTS_KEY not in results:
if self.verbose:
print(
" >No best candidate was cached for this configuration, "
"benching all kernels."
)
return None
if self.verbose:
print(" >Retrieving best candidate from cache.")
# deep copy best_candidate so that program and kernels
# do not spill into the cache (results dictionnary is mutable)
# and is used for all cache updates. Pickle cannot pickle
# pyopencl kernel and program objects.
best_candidate = copy.deepcopy(results[self.FULL_RESULTS_KEY])
(
extra_parameters,
work_size,
work_load,
global_work_size,
local_work_size,
prg,
kernel,
statistics,
cached_kernel_src,
cached_kernel_name,
cached_src_hash,
cached_kernel_hash,
cached_kernel_hash_logs,
) = best_candidate
if cached_kernel_hash != extra_kwds_hash:
msg = "\nCached kernel extra_kwds hash did not match the benched one:\n {}\n {}\n"
msg += "\nThis might be due to an upgrade of the generated code or "
msg += "a faulty implementation of {}.hash_extra_kwds()."
msg = msg.format(
cached_kernel_hash, extra_kwds_hash, type(tkernel).__name__
)
warnings.warn(msg, CodeGeneratorWarning)
return None
assert prg is None
assert kernel is None
global_work_size = npw.asintegerarray(global_work_size)
local_work_size = npw.asintegerarray(local_work_size)
kernel_name, kernel_src = tkernel.generate_kernel_src(
global_work_size=global_work_size,
local_work_size=local_work_size,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
tuning_mode=False,
dry_run=False,
)
hasher = self._hash_func()
hasher.update(kernel_src.encode("utf-8"))
src_hash = hasher.hexdigest()
if kernel_name != cached_kernel_name:
msg = "\nCached kernel name did not match the benched one:\n {}\n {}\n"
msg += "\nThis might be due to an upgrade of the generated code or "
msg += "a faulty implementation of {}.hash_extra_kwds()."
msg = msg.format(kernel_name, cached_kernel_name, type(tkernel).__name__)
warnings.warn(msg, CodeGeneratorWarning)
return None
if src_hash != cached_src_hash:
msg = "\nCached kernel source hash did not match the benched one.\n {}\n {}"
msg += "\nThis might be due to an upgrade of the generated code or "
msg += "a faulty implementation of {}.hash_extra_kwds()."
msg = msg.format(src_hash, cached_src_hash, type(tkernel).__name__)
if self.STORE_FULL_KERNEL_SOURCES:
if cached_kernel_src is not None:
cached_src = "/tmp/cached.cl"
tuned_src = "/tmp/tuned.cl"
with open(cached_src, "w") as f:
f.write(cached_kernel_src)
with open(tuned_src, "w") as f:
f.write(kernel_src)
msg += f"\nMatching cached kernel sources dumped to '{cached_src}'."
msg += f"\nCurrently tuned kernel sources dumped to '{tuned_src}'."
if cached_kernel_hash_logs is not None:
cached_src = "/tmp/cached_hash_logs.txt"
tuned_src = "/tmp/tuned_hash_logs.txt"
with open(cached_src, "w") as f:
f.write(cached_kernel_hash_logs)
with open(tuned_src, "w") as f:
f.write(extra_kwds_hash_logs)
msg += f"\nMatching cached kernel sources dumped to '{cached_src}'."
msg += f"\nCurrently tuned kernel sources dumped to '{tuned_src}'."
warnings.warn(msg, CodeGeneratorWarning)
return None
try:
(prg, kernel) = self.build_from_source(
kernel_name=kernel_name,
kernel_src=kernel_src,
build_options=self.build_opts,
force_verbose=force_verbose,
force_debug=force_debug,
)
except Exception as e:
msg = (
"Could not use cached kernel because there was a problem during build:"
)
msg += f"\n {e}"
print(msg)
return None
try:
self.check_kernel(
tkernel=tkernel,
kernel=kernel,
global_work_size=global_work_size,
local_work_size=local_work_size,
)
except Exception as e:
msg = "Could not use cached kernel because the following error occured during checkup:"
msg += f"\n {e}"
print(msg)
return None
best_candidate[self.program_idx] = prg
best_candidate[self.kernel_idx] = kernel
best_candidate[self.kernel_src_idx] = kernel_src
best_candidate[self.extra_kwds_hash_logs_idx] = extra_kwds_hash_logs
return tuple(best_candidate)
def _autotune_kernels(
self,
tkernel,
results,
extra_kwds,
force_verbose,
force_debug,
first_working,
extra_kwds_hash,
extra_kwds_hash_logs,
file_basename,
):
autotuner_config = self.autotuner_config
if first_working:
nruns = 1
else:
nruns = autotuner_config.nruns
max_candidates = extra_kwds.get(
"max_candidates", autotuner_config.max_candidates
)
bench_results = {}
best_stats = None
step_count = 0
self._print_step(step_count, "all", nruns)
ks = AutotunedKernelStatistics(tkernel, extra_kwds)
ks.max_candidates = max_candidates
ks.nruns = nruns
ks.file_basename = file_basename
with Timer() as timer:
params = tkernel.compute_parameters(extra_kwds=extra_kwds)
total_count, pruned_count, kept_count, failed_count = 0, 0, 0, 0
abort = False
for extra_parameters in params.iter_parameters():
extra_param_hash = tkernel.hash_extra_parameters(extra_parameters)
try:
(max_kernel_work_group_size, preferred_work_group_size_multiple) = (
self.collect_kernel_infos(
tkernel=tkernel,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
)
)
pks = ks.push_parameters(
extra_param_hash,
extra_parameters=extra_parameters,
max_kernel_work_group_size=max_kernel_work_group_size,
preferred_work_group_size_multiple=preferred_work_group_size_multiple,
)
except Exception as e:
msg = "Autotuner could not determine kernel info for parameters {} because of the following KernelGenerationError:\n{}\n"
msg = msg.format(extra_parameters, e)
warnings.warn(msg, CodeGeneratorWarning)
pks = ks.push_parameters(
extra_param_hash, extra_parameters=extra_parameters
)
continue
work_bounds = tkernel.compute_work_bounds(
max_kernel_work_group_size=max_kernel_work_group_size,
preferred_work_group_size_multiple=preferred_work_group_size_multiple,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
)
work_size = work_bounds.work_size
self._print_parameters(extra_parameters, work_bounds)
args_mapping = tkernel.compute_args_mapping(
extra_kwds=extra_kwds, extra_parameters=extra_parameters
)
isolation_params = extra_kwds.get("isolation_params", None)
msg = "Could not extract kernel arguments."
assert "kernel_args" in extra_kwds, msg
kernel_args = extra_kwds["kernel_args"]
check_instance(kernel_args, dict, keys=str)
args_list = self._compute_args_list(
args_mapping=args_mapping, **kernel_args
)
for work_load in work_bounds.iter_work_loads():
work = tkernel.compute_work_candidates(
work_bounds=work_bounds,
work_load=work_load,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
)
self._print_workload(work_load, work)
for local_work_size in work.iter_local_work_size():
global_work_size = tkernel.compute_global_work_size(
local_work_size=local_work_size,
work=work,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
)
run_key = (
extra_param_hash,
tuple(work_load),
tuple(global_work_size),
tuple(local_work_size),
)
pruned = None
try:
kernel_name, kernel_src = tkernel.generate_kernel_src(
global_work_size=global_work_size,
local_work_size=local_work_size,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
tuning_mode=True,
dry_run=False,
)
hasher = self._hash_func()
hasher.update(kernel_src.encode("utf-8"))
src_hash = hasher.hexdigest()
if run_key in results:
(cache_src_hash, cache_stats) = results[run_key]
if cache_src_hash != src_hash:
msg = "\nCached parameters candidate did not match the "
msg += "benched one.\n {}\n {}"
msg += "\nThis might be due to an upgrade of the generated "
msg += "code or a faulty implementation of "
msg += "{}.hash_extra_kwds()."
msg = msg.format(
src_hash, cache_src_hash, type(tkernel).__name__
)
warnings.warn(msg, CodeGeneratorWarning)
old_stats = None
else:
old_stats = cache_stats
else:
old_stats = None
from_cache = old_stats is not None
(prg, kernel, statistics, pruned) = (
self.bench_one_from_source(
tkernel=tkernel,
kernel_name=kernel_name,
kernel_src=kernel_src,
args_list=args_list,
args_mapping=args_mapping,
isolation_params=isolation_params,
target_nruns=nruns,
old_stats=old_stats,
best_stats=best_stats,
global_work_size=global_work_size,
local_work_size=local_work_size,
force_verbose=force_verbose,
force_debug=force_debug,
)
)
check_instance(statistics, KernelStatistics)
assert statistics.nruns >= 1
if pruned:
pruned_count += 1
else:
kept_count += 1
if (best_stats is None) or (
statistics.mean < best_stats.mean
):
local_best = True
best_stats = statistics
else:
local_best = False
candidate = (
extra_parameters,
tuple(work_size),
tuple(work_load),
tuple(global_work_size),
tuple(local_work_size),
prg,
kernel,
statistics,
kernel_src,
kernel_name,
src_hash,
extra_kwds_hash,
extra_kwds_hash_logs,
)
results[run_key] = (src_hash, statistics)
bench_results[run_key] = candidate
pks.push_run_statistics(
run_key,
work_size=work_size,
work_load=work_load,
local_work_size=local_work_size,
global_work_size=global_work_size,
statistics=statistics,
pruned=pruned,
local_best=local_best,
error=None,
)
except KernelGenerationError as e:
if __KERNEL_DEBUG__:
sys.stderr.write(str(e) + "\n")
failed_count += 1
statistics = None
from_cache = False
pks.push_run_statistics(
run_key,
work_size=work_size,
work_load=work_load,
local_work_size=local_work_size,
global_work_size=global_work_size,
statistics=None,
pruned=None,
local_best=None,
error=e,
)
total_count += 1
abort = (max_candidates is not None) and (
(pruned_count + kept_count) >= max_candidates
)
abort |= first_working and kept_count == 1
self._print_full_candidate(
local_work_size,
global_work_size,
statistics,
pruned,
from_cache,
)
self._print_candidate(
(statistics is None), from_cache, total_count, abort
)
if abort:
break
if abort:
break
self._dump_cache(silent=True)
if abort:
break
if abort:
if first_working:
msg = ">Achieved first running kernel."
else:
msg = ">Achieved maximum number of configured candidates: {}"
msg = msg.format(max_candidates)
if self.verbose > 1:
print(msg)
assert total_count == (kept_count + pruned_count + failed_count)
if kept_count == 0:
msg = "No bench result were generated out of {} runs "
msg += "(kept_count={}, pruned_count={}, failed_count={}), aborting."
msg = msg.format(total_count, kept_count, pruned_count, failed_count)
raise RuntimeError(msg)
keep_only = max(previous_pow2(kept_count), 1)
self._print_first_step_results(
total_count, kept_count, pruned_count, failed_count, keep_only
)
candidates = tuple(
sorted(
bench_results.items(),
key=lambda x: x[1][self.kernel_statistics_idx],
)
)
candidates = candidates[:keep_only]
while len(candidates) > 1:
step_count += 1
nruns *= 2
self._print_step(step_count, f"{len(candidates)} BEST", nruns)
for run_key, run_params in candidates:
(
extra_params,
work_size,
work_load,
global_work_size,
local_work_size,
_,
kernel,
old_stats,
_,
_,
_,
_,
_,
) = run_params
self.bench_one_from_binary(
kernel=kernel,
target_nruns=nruns,
old_stats=old_stats,
best_stats=best_stats,
global_work_size=global_work_size,
local_work_size=local_work_size,
)
candidates = tuple(
sorted(candidates, key=lambda x: x[1][self.kernel_statistics_idx])
)
self._print_step_results(candidates, self.kernel_statistics_idx)
candidates = candidates[: max(previous_pow2(len(candidates)), 1)]
ks.push_step(step_count, candidates)
best_candidate = candidates[0][1]
self._print_footer(ellapsed=timer.interval, best_candidate=best_candidate)
if autotuner_config.filter_statistics(file_basename):
ks.exec_time = timer.interval
ks.best_candidate = best_candidate
ks.kernel_name = self.name
ks.kept_count = kept_count
ks.pruned_count = pruned_count
ks.failed_count = failed_count
ks.total_count = total_count
ks.extra_kwds_hash = best_candidate[self.extra_kwds_hash_idx]
if autotuner_config.plot_statistics and not first_working:
ks.plot()
# Regenerate final kernel
best_candidate = list(best_candidate)
self._build_final_kernel(tkernel, best_candidate, extra_kwds)
returned_best_candidate = tuple(best_candidate)
# Export best candidate results
if not self.STORE_FULL_KERNEL_SOURCES:
best_candidate[self.kernel_src_idx] = None
best_candidate[self.extra_kwds_hash_logs_idx] = None
best_candidate[self.program_idx] = None
best_candidate[self.kernel_idx] = None
results[self.FULL_RESULTS_KEY] = best_candidate
self._dump_cache()
return returned_best_candidate
def _build_final_kernel(self, tkernel, best_candidate, extra_kwds):
(
extra_parameters,
work_size,
work_load,
global_work_size,
local_work_size,
_,
_,
_,
_,
_,
_,
_,
_,
) = best_candidate
global_work_size = npw.asintegerarray(global_work_size)
local_work_size = npw.asintegerarray(local_work_size)
kernel_name, kernel_src = tkernel.generate_kernel_src(
global_work_size=global_work_size,
local_work_size=local_work_size,
extra_parameters=extra_parameters,
extra_kwds=extra_kwds,
tuning_mode=False,
dry_run=False,
)
hasher = self._hash_func()
hasher.update(kernel_src.encode("utf-8"))
src_hash = hasher.hexdigest()
(prg, kernel) = self.build_from_source(
kernel_name=kernel_name,
kernel_src=kernel_src,
build_options=self.build_opts,
force_verbose=None,
force_debug=None,
)
self.check_kernel(
tkernel=tkernel,
kernel=kernel,
global_work_size=global_work_size,
local_work_size=local_work_size,
)
best_candidate[self.program_idx] = prg
best_candidate[self.kernel_idx] = kernel
best_candidate[self.kernel_src_idx] = kernel_src
best_candidate[self.src_hash_idx] = src_hash
return best_candidate
def _compute_args_list(self, args_mapping, **kernel_args):
"""
Compute argument list from input keywords and args_mapping.
"""
A = set(args_mapping.keys())
B = set(kernel_args.keys())
if A != B:
if A - B:
msg = "Missing kernel arguments {}.".format(
", ".join(f"'{argname}'" for argname in (A - B))
)
elif B - A:
msg = "Unknown kernel arguments {}.".format(
", ".join(f"'{argname}'" for argname in (B - A))
)
else:
msg = "The impossible happened."
raise ValueError(msg)
args_list = [
None,
] * len(args_mapping)
arg_indices = {ka[0] for ka in args_mapping.values()}
if arg_indices != set(range(len(arg_indices))):
msg = "Illformed argument position mapping:\n"
msg += "\n".join(
f" >argument {argpos}: {argname}"
for (argname, argpos) in zip(args_mapping.keys(), arg_indices)
)
msg += "\nExpected contiguous integer argument positions."
raise ValueError(msg)
for arg_name, arg_value in kernel_args.items():
if arg_name not in args_mapping:
msg = "Unknown argument {}, valid ones are {}."
msg = msg.format(arg_name, ", ".join(args_mapping.keys()))
raise ValueError(msg)
(arg_index, arg_types) = args_mapping[arg_name]
if isinstance(arg_types, npw.dtype):
msg = None
if not isinstance(arg_value, npw.ndarray):
msg = "Argument {} at position {} should be a np.ndarray, got a {}."
msg = msg.format(arg_name, arg_index, type(arg_value))
elif not arg_value.dtype == arg_types:
msg = "Argument {} at position {} is a np.ndarray of wrong dtype, "
msg += "got a {}, expected a {}."
msg = msg.format(arg_name, arg_index, type(arg_value), arg_types)
elif not arg_value.size == 1:
msg = "Argument {} at position {} is not a scalar np.ndarray, "
msg += "shape={}, size={}."
msg = msg.format(
arg_name, arg_index, arg_value.shape, arg_value.size
)
if msg is not None:
raise ValueError(msg)
elif not isinstance(arg_value, arg_types):
msg = "Argument {} at position {} should be of type {} but got a {}."
msg = msg.format(arg_name, arg_index, arg_types, type(arg_value))
raise TypeError(msg)
args_list[arg_index] = arg_value
args_list = tuple(args_list)
return tuple(args_list)
[docs]
@abstractmethod
def autotuner_config_key(self):
"""Caching key for autotuner configurations."""
pass
[docs]
def bench_one_from_source(
self,
tkernel,
kernel_name,
kernel_src,
args_list,
args_mapping,
isolation_params,
global_work_size,
local_work_size,
target_nruns,
old_stats,
best_stats,
force_verbose,
force_debug,
):
"""
Compile and bench one kernel by executing it nruns times.
Return the compiled kernel, KernelStatistics and whether it was
pruned or not.
"""
(prg, kernel) = self.build_from_source(
kernel_name=kernel_name,
kernel_src=kernel_src,
build_options=self.build_opts,
force_verbose=force_verbose,
force_debug=force_debug,
)
self.check_kernel_args(kernel, args_list)
for i, arg in enumerate(args_list):
try:
kernel.set_arg(i, arg)
except:
msg = "Failed to set opencl kernel argument {} which is of type {}.\n"
msg = msg.format(i, type(arg))
print(msg)
raise
if self.DUMP_LAST_TUNED_KERNEL:
name = "currently_tuned"
kernel_src_file = tkernel.generate_source_file(name, kernel_src, force=True)
kernel_sim_file = tkernel.generate_oclgrind_isolation_file(
kernel=kernel,
kernel_name=name,
kernel_source=kernel_src_file,
global_work_size=global_work_size,
local_work_size=local_work_size,
args_list=args_list,
args_mapping=args_mapping,
isolation_params=isolation_params,
force=True,
)
print(f"Current tuned kernel has been dumped:")
print(f" {kernel_sim_file}")
bench_results = self.bench_one_from_binary(
kernel=kernel,
target_nruns=target_nruns,
old_stats=old_stats,
best_stats=best_stats,
global_work_size=global_work_size,
local_work_size=local_work_size,
)
return (prg, kernel) + bench_results
[docs]
@abstractmethod
def build_from_source(
self, kernel_name, kernel_src, build_options, force_verbose, force_debug
):
"""
Compile one kernel from source.
Return the compiled program and the kernel.
"""
pass
[docs]
@abstractmethod
def bench_one_from_binary(
self,
kernel,
global_work_size,
local_work_size,
target_nruns,
old_stats,
best_stats,
force_verbose,
force_debug,
):
"""
Compile and bench one kernel by executing it nruns times.
Return the compiled kernel, KernelStatistics and whether it was
pruned or not.
"""
pass
[docs]
@abstractmethod
def collect_kernel_infos(self, tkernel, extra_parameters, extra_kwds):
"""
Collect kernel infos before computing workload and work group size.
"""
pass
[docs]
@abstractmethod
def check_kernel(self, tkernel, kernel, global_work_size, local_work_size):
pass
[docs]
@abstractmethod
def check_kernel_args(self, kernel, args_list):
pass
def _print_separator(self):
print("_" * 80)
def _print_header(self, extra_kwds):
verbose = self.verbose
if verbose:
self._print_separator()
print(f"\n|| KERNEL {self.name.upper()} AUTOTUNING")
print(
"\n *config: {} (nruns={}, prune={}, max_candidates={})".format(
self.autotuner_config.autotuner_flag,
self.autotuner_config.nruns,
self.autotuner_config.prune_threshold,
extra_kwds.get(
"max_candidates", self.autotuner_config.max_candidates
),
)
)
print(" *build_opts: {}".format(self.tunable_kernel.build_opts or "None"))
return verbose
def _print_parameters(self, extra_parameters, work_bounds):
if self.verbose > 2:
self._print_separator()
msg = self.indent(1) + "::Current tuning parameters:: {}"
msg = msg.format(extra_parameters)
msg0 = "\n" + self.indent(1)
msg0 += " work_size={}, min_work_load={}, max_work_load={}"
msg += msg0.format(
work_bounds.work_size,
work_bounds.min_work_load,
work_bounds.max_work_load,
)
print(msg)
def _print_workload(self, work_load, work):
if self.verbose > 2:
msg = (
"\n"
+ self.indent(2)
+ "::Current workload {}, global_work_size set to {}::"
)
msg = msg.format(work_load, work.global_work_size)
print(msg)
if self.verbose < 4:
self._print_separator()
def _print_first_step_results(
self, total_count, kept_count, pruned_count, failed_count, keep_only
):
verbose = self.verbose
if verbose > 1:
if verbose >= 4:
self._print_separator()
else:
print("\n")
print(self.indent(1) + " All candidate kernels have been run:")
msg = (
self.indent(2)
+ "Collected {} bench results (kept={}, pruned={}, failed={})."
)
msg = msg.format(total_count, kept_count, pruned_count, failed_count)
print(msg)
msg = (
self.indent(2)
+ "Building binary tree optimizer out of {} best candidates."
)
msg = msg.format(keep_only)
print(msg)
def _print_step(self, step, candidates, nruns):
if self.verbose > 1:
msg = "\n AUTOTUNING STEP {} :: running {} candidates over {} runs"
msg = msg.format(step, candidates, nruns)
self._print_separator()
print(msg.upper())
def _print_candidate(self, failed, from_cache, total_count, abort):
if self.verbose == 2:
if total_count == 1:
sys.stdout.write(self.indent(2))
if failed:
sys.stdout.write("x")
elif from_cache:
sys.stdout.write(":")
else:
sys.stdout.write(".")
if abort:
sys.stdout.write("|")
if total_count % 40 == 0:
sys.stdout.write("\n" + self.indent(2))
elif total_count % 5 == 0:
sys.stdout.write(" ")
sys.stdout.flush()
def _print_full_candidate(
self, local_work_size, global_work_size, statistics, is_pruned, from_cache
):
if self.verbose > 3:
failed = statistics is None
if failed:
msg = "No statistics [KERNEL ERROR]"
elif is_pruned:
msg = f"{statistics} [PRUNED]"
else:
msg = f"{statistics} [KEPT]"
if from_cache:
indicator = ":"
else:
indicator = "|"
config = (
self.indent(3)
+ f"{indicator} L={local_work_size:^10}, G={global_work_size:^10}: {msg}"
)
print(config)
def _print_step_results(self, sorted_candidates, kernel_statistics_idx):
if self.verbose == 2:
best = sorted_candidates[0][1]
worst = sorted_candidates[-1][1]
print(self.indent(2) + f"worst candidate: {worst[kernel_statistics_idx]}")
print(self.indent(2) + f"best candidate: {best[kernel_statistics_idx]}")
def _print_footer(self, ellapsed, best_candidate):
if self.verbose:
(
best_extra_params,
best_work_size,
best_work_load,
best_global_size,
best_local_size,
_,
_,
best_stats,
_,
_,
_,
_,
_,
) = best_candidate
if self.verbose > 1:
if ellapsed is not None:
self._print_separator()
msg = "\n Autotuning successfully run in {}."
msg = msg.format(time2str(ellapsed))
print(msg)
self._print_separator()
id1 = self.indent(1)
print(f"\n|> BEST OVERALL RESULT for kernel {self.name}:")
print(id1 + " => Extra params:")
for ep, val in best_extra_params.items():
print(self.indent(2) + f"*{ep}: {val}")
msg = id1 + " => WL={} G={} L={}"
msg = msg.format(best_work_load, best_global_size, best_local_size)
print(msg)
print(id1 + f" => Execution statistics: {best_stats}")
self._print_separator()
print()